home *** CD-ROM | disk | FTP | other *** search
/ PC World Komputer 2010 April / PCWorld0410.iso / hity wydania / Ubuntu 9.10 PL / karmelkowy-koliberek-9.10-netbook-remix-PL.iso / casper / filesystem.squashfs / usr / lib / python2.6 / multiprocessing / queues.pyc (.txt) < prev    next >
Encoding:
Python Compiled Bytecode  |  2009-11-11  |  11.4 KB  |  386 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.6)
  3.  
  4. __all__ = [
  5.     'Queue',
  6.     'SimpleQueue',
  7.     'JoinableQueue']
  8. import sys
  9. import os
  10. import threading
  11. import collections
  12. import time
  13. import atexit
  14. import weakref
  15. from Queue import Empty, Full
  16. import _multiprocessing
  17. from multiprocessing import Pipe
  18. from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
  19. from multiprocessing.util import debug, info, Finalize, register_after_fork
  20. from multiprocessing.forking import assert_spawning
  21.  
  22. class Queue(object):
  23.     
  24.     def __init__(self, maxsize = 0):
  25.         if maxsize <= 0:
  26.             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
  27.         
  28.         self._maxsize = maxsize
  29.         (self._reader, self._writer) = Pipe(duplex = False)
  30.         self._rlock = Lock()
  31.         self._opid = os.getpid()
  32.         if sys.platform == 'win32':
  33.             self._wlock = None
  34.         else:
  35.             self._wlock = Lock()
  36.         self._sem = BoundedSemaphore(maxsize)
  37.         self._after_fork()
  38.         if sys.platform != 'win32':
  39.             register_after_fork(self, Queue._after_fork)
  40.         
  41.  
  42.     
  43.     def __getstate__(self):
  44.         assert_spawning(self)
  45.         return (self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid)
  46.  
  47.     
  48.     def __setstate__(self, state):
  49.         (self._maxsize, self._reader, self._writer, self._rlock, self._wlock, self._sem, self._opid) = state
  50.         self._after_fork()
  51.  
  52.     
  53.     def _after_fork(self):
  54.         debug('Queue._after_fork()')
  55.         self._notempty = threading.Condition(threading.Lock())
  56.         self._buffer = collections.deque()
  57.         self._thread = None
  58.         self._jointhread = None
  59.         self._joincancelled = False
  60.         self._closed = False
  61.         self._close = None
  62.         self._send = self._writer.send
  63.         self._recv = self._reader.recv
  64.         self._poll = self._reader.poll
  65.  
  66.     
  67.     def put(self, obj, block = True, timeout = None):
  68.         if not not (self._closed):
  69.             raise AssertionError
  70.         if not self._sem.acquire(block, timeout):
  71.             raise Full
  72.         self._sem.acquire(block, timeout)
  73.         self._notempty.acquire()
  74.         
  75.         try:
  76.             if self._thread is None:
  77.                 self._start_thread()
  78.             
  79.             self._buffer.append(obj)
  80.             self._notempty.notify()
  81.         finally:
  82.             self._notempty.release()
  83.  
  84.  
  85.     
  86.     def get(self, block = True, timeout = None):
  87.         if block and timeout is None:
  88.             self._rlock.acquire()
  89.             
  90.             try:
  91.                 res = self._recv()
  92.                 self._sem.release()
  93.                 return res
  94.             finally:
  95.                 self._rlock.release()
  96.  
  97.         elif block:
  98.             deadline = time.time() + timeout
  99.         
  100.         if not self._rlock.acquire(block, timeout):
  101.             raise Empty
  102.         self._rlock.acquire(block, timeout)
  103.         
  104.         try:
  105.             if not block or deadline - time.time():
  106.                 pass
  107.             if not self._poll(0):
  108.                 raise Empty
  109.             self._poll(0)
  110.             res = self._recv()
  111.             self._sem.release()
  112.             return res
  113.         finally:
  114.             self._rlock.release()
  115.  
  116.  
  117.     
  118.     def qsize(self):
  119.         return self._maxsize - self._sem._semlock._get_value()
  120.  
  121.     
  122.     def empty(self):
  123.         return not self._poll()
  124.  
  125.     
  126.     def full(self):
  127.         return self._sem._semlock._is_zero()
  128.  
  129.     
  130.     def get_nowait(self):
  131.         return self.get(False)
  132.  
  133.     
  134.     def put_nowait(self, obj):
  135.         return self.put(obj, False)
  136.  
  137.     
  138.     def close(self):
  139.         self._closed = True
  140.         self._reader.close()
  141.         if self._close:
  142.             self._close()
  143.         
  144.  
  145.     
  146.     def join_thread(self):
  147.         debug('Queue.join_thread()')
  148.         if not self._closed:
  149.             raise AssertionError
  150.         if self._jointhread:
  151.             self._jointhread()
  152.         
  153.  
  154.     
  155.     def cancel_join_thread(self):
  156.         debug('Queue.cancel_join_thread()')
  157.         self._joincancelled = True
  158.         
  159.         try:
  160.             self._jointhread.cancel()
  161.         except AttributeError:
  162.             pass
  163.  
  164.  
  165.     
  166.     def _start_thread(self):
  167.         debug('Queue._start_thread()')
  168.         self._buffer.clear()
  169.         self._thread = threading.Thread(target = Queue._feed, args = (self._buffer, self._notempty, self._send, self._wlock, self._writer.close), name = 'QueueFeederThread')
  170.         self._thread.daemon = True
  171.         debug('doing self._thread.start()')
  172.         self._thread.start()
  173.         debug('... done self._thread.start()')
  174.         created_by_this_process = self._opid == os.getpid()
  175.         if not (self._joincancelled) and not created_by_this_process:
  176.             self._jointhread = Finalize(self._thread, Queue._finalize_join, [
  177.                 weakref.ref(self._thread)], exitpriority = -5)
  178.         
  179.         self._close = Finalize(self, Queue._finalize_close, [
  180.             self._buffer,
  181.             self._notempty], exitpriority = 10)
  182.  
  183.     
  184.     def _finalize_join(twr):
  185.         debug('joining queue thread')
  186.         thread = twr()
  187.         if thread is not None:
  188.             thread.join()
  189.             debug('... queue thread joined')
  190.         else:
  191.             debug('... queue thread already dead')
  192.  
  193.     _finalize_join = staticmethod(_finalize_join)
  194.     
  195.     def _finalize_close(buffer, notempty):
  196.         debug('telling queue thread to quit')
  197.         notempty.acquire()
  198.         
  199.         try:
  200.             buffer.append(_sentinel)
  201.             notempty.notify()
  202.         finally:
  203.             notempty.release()
  204.  
  205.  
  206.     _finalize_close = staticmethod(_finalize_close)
  207.     
  208.     def _feed(buffer, notempty, send, writelock, close):
  209.         debug('starting thread to feed data to pipe')
  210.         is_exiting = is_exiting
  211.         import util
  212.         nacquire = notempty.acquire
  213.         nrelease = notempty.release
  214.         nwait = notempty.wait
  215.         bpopleft = buffer.popleft
  216.         sentinel = _sentinel
  217.         if sys.platform != 'win32':
  218.             wacquire = writelock.acquire
  219.             wrelease = writelock.release
  220.         else:
  221.             wacquire = None
  222.         
  223.         try:
  224.             while None:
  225.                 
  226.                 try:
  227.                     if not buffer:
  228.                         nwait()
  229.                 finally:
  230.                     nrelease()
  231.  
  232.                 
  233.                 try:
  234.                     while None:
  235.                         obj = bpopleft()
  236.                         if obj is sentinel:
  237.                             debug('feeder thread got sentinel -- exiting')
  238.                             close()
  239.                             return None
  240.                         if wacquire is None:
  241.                             send(obj)
  242.                             continue
  243.                         wacquire()
  244.                         
  245.                         try:
  246.                             send(obj)
  247.                         finally:
  248.                             wrelease()
  249.  
  250.                     continue
  251.                     except IndexError:
  252.                         continue
  253.                     
  254.                 except Exception:
  255.                     e = None
  256.                     
  257.                     try:
  258.                         if is_exiting():
  259.                             info('error in queue thread: %s', e)
  260.                         else:
  261.                             import traceback
  262.                             traceback.print_exc()
  263.                     except Exception:
  264.                         pass
  265.                     except:
  266.                         None<EXCEPTION MATCH>Exception
  267.                     
  268.  
  269.                     None<EXCEPTION MATCH>Exception
  270.  
  271.                 return None
  272.  
  273.  
  274.     _feed = staticmethod(_feed)
  275.  
  276. _sentinel = object()
  277.  
  278. class JoinableQueue(Queue):
  279.     
  280.     def __init__(self, maxsize = 0):
  281.         Queue.__init__(self, maxsize)
  282.         self._unfinished_tasks = Semaphore(0)
  283.         self._cond = Condition()
  284.  
  285.     
  286.     def __getstate__(self):
  287.         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
  288.  
  289.     
  290.     def __setstate__(self, state):
  291.         Queue.__setstate__(self, state[:-2])
  292.         (self._cond, self._unfinished_tasks) = state[-2:]
  293.  
  294.     
  295.     def put(self, obj, block = True, timeout = None):
  296.         if not not (self._closed):
  297.             raise AssertionError
  298.         if not self._sem.acquire(block, timeout):
  299.             raise Full
  300.         self._sem.acquire(block, timeout)
  301.         self._notempty.acquire()
  302.         self._cond.acquire()
  303.         
  304.         try:
  305.             if self._thread is None:
  306.                 self._start_thread()
  307.             
  308.             self._buffer.append(obj)
  309.             self._unfinished_tasks.release()
  310.             self._notempty.notify()
  311.         finally:
  312.             self._cond.release()
  313.             self._notempty.release()
  314.  
  315.  
  316.     
  317.     def task_done(self):
  318.         self._cond.acquire()
  319.         
  320.         try:
  321.             if not self._unfinished_tasks.acquire(False):
  322.                 raise ValueError('task_done() called too many times')
  323.             self._unfinished_tasks.acquire(False)
  324.             if self._unfinished_tasks._semlock._is_zero():
  325.                 self._cond.notify_all()
  326.         finally:
  327.             self._cond.release()
  328.  
  329.  
  330.     
  331.     def join(self):
  332.         self._cond.acquire()
  333.         
  334.         try:
  335.             if not self._unfinished_tasks._semlock._is_zero():
  336.                 self._cond.wait()
  337.         finally:
  338.             self._cond.release()
  339.  
  340.  
  341.  
  342.  
  343. class SimpleQueue(object):
  344.     
  345.     def __init__(self):
  346.         (self._reader, self._writer) = Pipe(duplex = False)
  347.         self._rlock = Lock()
  348.         if sys.platform == 'win32':
  349.             self._wlock = None
  350.         else:
  351.             self._wlock = Lock()
  352.         self._make_methods()
  353.  
  354.     
  355.     def empty(self):
  356.         return not self._reader.poll()
  357.  
  358.     
  359.     def __getstate__(self):
  360.         assert_spawning(self)
  361.         return (self._reader, self._writer, self._rlock, self._wlock)
  362.  
  363.     
  364.     def __setstate__(self, state):
  365.         (self._reader, self._writer, self._rlock, self._wlock) = state
  366.         self._make_methods()
  367.  
  368.     
  369.     def _make_methods(self):
  370.         recv = self._reader.recv
  371.         racquire = self._rlock.acquire
  372.         rrelease = self._rlock.release
  373.         
  374.         def get():
  375.             racquire()
  376.             
  377.             try:
  378.                 return recv()
  379.             finally:
  380.                 rrelease()
  381.  
  382.  
  383.         self.get = get
  384.  
  385.  
  386.